<!DOCTYPE html>
<html>
  <head>
    <title>Title</title>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
    <style type="text/css">
      @import url(http://fonts.googleapis.com/css?family=Yanone+Kaffeesatz);
      @import url(http://fonts.googleapis.com/css?family=Droid+Serif:400,700,400italic);
      @import url(http://fonts.googleapis.com/css?family=Ubuntu+Mono:400,700,400italic);

      body { font-family: 'Droid Serif'; }
      h1, h2, h3 {
        font-family: 'Yanone Kaffeesatz';
        font-weight: normal;
      }
      img {
        width: 100%;
        height: auto;
      }
      img#kixer-logo {
        width: 130px;
        height: 54px;
      }
      ul, ol {
        margin: 6px 0 6px 0;  
      }
      li {
        margin: 0 0 12px 0;  
      }
      .remark-code, .remark-inline-code { font-family: 'Ubuntu Mono'; }
    </style>
  </head>
  <body>
    <textarea id="source">

class: center, middle

# Exactly-Once Streaming from Kafka

cody@koeninger.org

<image src="slides/kixer-logo.png" id="kixer-logo" />

https://github.com/koeninger/kafka-exactly-once

---

## Kafka is a ~~message queue~~ circular buffer

* Split into topic/partition
* Fixed size, based on disk space or time
* Oldest messages deleted to maintain size
* Messages are otherwise immutable
* Indexed only by offset
* Client tracks read offset, not server

###Delivery semantics are _your responsibility_

From Kafka, through a transformation, to results in your data store

---

##At-most-once

1. Save offsets
2. !! Possible failure !!
3. Save results

###On failure, restart at last saved offset, messages are lost

---

##At-least-once

1. Save results
2. !! Possible failure !!
3. Save offsets

###On failure, messages are repeated

No possible magic config option to do better than this

---

## Idempotent exactly-once

1. Save results with a natural unique key
2. !! Possible failure !!
3. Save offsets

###On failure, messages are repeated, but we don't care

Immutable messages and a pure transformation yield the same results

---

## Idempotent pros / cons

Pro:
* Simple
* Works well for shape-preserving transformations (map)

Con:
* May be hard to identify natural unique key
* Especially hard for aggregate transformations (fold)
* Won't work for destructive updates

Note:
* Results and offsets may be in different data stores

---

## Transactional exactly-once

1. Begin transaction
2. Save results
3. Save offsets
4. Ensure offsets are ok (increasing without gaps)
5. Commit transaction

### On failure, rollback, results and offsets remain in sync

---

## Transactional pros / cons

Pro:
* Works easily for any transformation
* Destructive updates ok

Con:
* More complex
* Requires a transactional data store

Note:
* Results and offsets must be in same data store

---

![](slides/kafka-old.png)

---
## Receiver-based stream pros / cons

Pro:
* WAL design could work with non-Kafka data sources

Con:
* Long running receivers make parallelism awkward and costly
* Duplication of write operations
* Dependent on HDFS
* Must use idempotence for exactly-once
* No access to offsets, can't use transactional approach

---

![](slides/kafka-new.png)

---

## Direct stream pros / cons

Pro:
* Spark partition 1:1 Kafka topic/partition, easy cheap parallelism
* No duplicate writes
* No dependency on HDFS
* Access to offsets, can use idempotent or transactional

Con:
* Specific to Kafka
* Need adequate Kafka retention (OffsetOutOfRange is _your fault_)

---

## Don't care about semantics?

![](slides/spark-kafka-change-cpu-utilization.png)

### How about server cost?

---

## Basic direct stream API

```scala
val stream: InputDStream[(String, String)] =
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    streamingContext,
    Map(
      "metadata.broker.list" -> "localhost:9092,anotherhost:9092"),
      "auto.offset.reset" -> "largest"
    ),
    Set("sometopic", "anothertopic")
  )
```

---

## Basic direct stream API delivery semantics

auto.offset.reset -> largest:
* Starts at latest offset, thus losing data
* Not at-most-once (need to set maxFailures as well)

auto.offset.reset -> smallest:
* Starts at earliest offset
* At-least-once, but replays whole log

### If you want finer grained control, must store offsets somewhere

---

## Where to store offsets

Spark Checkpoint:
* Easy
* No need to access offsets, will be used on restart
* Must use idempotent, not transactional
* Checkpoints may not be recoverable

Your own data store:
* Complex
* Need to access offsets, save them, and provide them on (re)start
* Idempotent or transactional
* Offsets are just as recoverable as your results

---

## Spark checkpoint

Same as any other Spark checkpoint:

```scala
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)   // new context
val stream = KafkaUtils.createDirectStream(...) // setup DStream
...
ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(
  checkpointDirectory,
  functionToCreateContext _)
```

Keep in mind you still need idempotent storage of results.

Other than that, you're done.

---

## Providing offsets on (re)start

```scala
// begin from the the offsets committed to the database
val fromOffsets = DB.readOnly { implicit session =>
  sql"select topic, part, off from txn_offsets".
   map { resultSet =>
     TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3)
   }.list.apply().toMap
}

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => mmd.message.length

val stream: InputDStream[Int] =
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Int](
    streamingContext,
    Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092"),
    fromOffsets,
    messageHandler
  )
```

This is the more advanced API. Instead of a set of topics, it takes:
* a map of TopicAndPartition -> the offset to start from
* a function to extract the desired value from each message and metadata.

---
## Accessing offsets, per message

```scala
val messageHandler =
  (mmd: MessageAndMetadata[String, String]) =>
    (mmd.topic, mmd.partition, mmd.offset, mmd.key, mmd.message)
```

Your message handler has full access to all of the metadata.

This may not be the most efficient way, though.

---
## Accessing offsets, per batch

```scala
stream.foreachRDD { rdd =>
  // Cast the rdd to an interface that lets us get an array of OffsetRange
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = rdd.someTransformationUsingSparkMethods
  ...
  // Your save method. Note that this runs on the driver
  mySaveBatch(offsetRanges, results)
}
```

Each OffsetRange in the array has the following fields
* topic: Kafka topic name 
* partition: Kafka partition id 
* fromOffset: inclusive starting offset
* untilOffset: exclusive ending offset

---
## Accessing offsets, per partition
This is safe because each Spark partition is 1:1 with a Kafka topic/partition
```scala
stream.foreachRDD { rdd =>
  // Cast the rdd to an interface that lets us get an array of OffsetRange
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  rdd.foreachPartition { iter =>
    // index to get the correct offset range for the rdd partition we're working on
    val offsetRange: OffsetRange = offsetRanges(TaskContext.get.partitionId)

    val perPartitionResult = iter.someTransformationUsingScalaMethods

    // Your save method.  Note this runs on the executors.
    mySavePartition(offsetRange, perPartitionResult)
  }
}
```
This is **not safe** because there is a shuffle, so no longer 1:1
```scala
rdd.reduceByKey.foreachPartition { ...
```

---
## Storing offsets transactionally

Make sure that start of this offset range matches end of last saved offset range

```scala
// localTx is transactional
DB.localTx { implicit session =>
  // store results
  ...

  // store offsets
  val offsetRows = sql"""
update txn_offsets set off = ${osr.untilOffset}
where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
""".update.apply()

  if (offsetRows != 1) {
    // rollback and/or throw exception
  }
}
```
---
class: center, middle

# Questions?

cody@koeninger.org

<image src="slides/kixer-logo.png" id="kixer-logo" />

https://github.com/koeninger/kafka-exactly-once

    </textarea>
    <script src="slides/remark-latest.min.js">
    </script>
    <script>
      var slideshow = remark.create();
    </script>
  </body>
</html>
